Source code for hysop.core.mpi.bridge

# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""Tools to compute the intersection between two topologies.

`.. currentmodule : hysop.core.mpi.bridge

* :class:`~Bridge` for topologies/operators defined
  inside the same mpi communicator
* :class:`~BridgeInter`
* :class:`~BridgeOverlap` for topologies defined
  inside the same mpi parent communicator and
  with a different number of processes

"""
from hysop.constants import HYSOP_INTEGER
from hysop.topology.cartesian_topology import CartesianTopologyView, CartesianTopology
from hysop.core.mpi.topo_tools import TopoTools
from hysop.tools.misc import Utils
from hysop.core.mpi import MPI
from hysop.tools.numpywrappers import npw
from hysop.tools.htypes import check_instance, first_not_None


[docs] class Bridge: """ Intersection between two topologies. """ def __init__(self, source, target, dtype, order): """Intersection between two topologies. See users' manual for details Parameters ---------- source, target : :class:`~hysop.topology.topology.CartesianTopology` topologies that own the source mesh and targeted mesh dtype: numpy dtype to be send and received order: memory order of arrays to be send/recieved """ # -- All dictionnaries below use rank number (in parent comm) # as keys. -- # Dictionnary of indices of grid points to be received on target. self._recv_indices = {} # Dictionnary of indices of grid points to be sent from current rank self._send_indices = {} # Dictionnary of MPI derived types used for MPI receive self._recv_types = None # Dictionnary of MPI derived types used for MPI send. self._send_types = None # The communicator that will be used in this bridge. self.comm = None # current rank in this comm self._rank = None self._source = source self._target = target self._dtype = dtype self._order = order self._check_topologies() # nothing to be done ... if source == target: return self._build_send_recv_dict() def _check_topologies(self): """Check if source/target topologies exists and are complient""" check_instance(self._source, CartesianTopologyView) check_instance(self._target, CartesianTopologyView) msg = "Bridge error, both source/target topologies" msg += " must have the same parent communicator." assert TopoTools.compare_comm(self._source.parent, self._target.parent), msg # The assert above ensure that source and target hold the same # group of process in the same communication context. self.comm = self._source.parent self._rank = self.comm.Get_rank() def _build_send_recv_dict(self): """Compute local (mpi) intersection of two topologies i.e. find which grid points are on both source and target mesh. """ # Get global indices of the mesh on source for all mpi processes. indices_source = TopoTools.gather_global_indices(self._source) # Get global indices of the mesh on target for all mpi processes. indices_target = TopoTools.gather_global_indices(self._target) # From now on, we have indices_source[rk] = global indices (slice) # of grid points of the source on process number rk in parent. # And the same thing for indices_target. # Compute the intersections of the mesh on source with every mesh on # target ---> find which part of the local mesh must be sent to who, # which results in the self._send_indices dict. # self._send_indices[i] = [slice(...), slice(...), slice(...)] # means that the current process must send to process i the grid points # defined by the slices above. current = indices_source[self._rank] for rk in indices_target: inter = Utils.intersect_slices(current, indices_target[rk]) if inter is not None: self._send_indices[rk] = inter # Back to local indices convert = self._source.mesh.global_to_local self._send_indices = { rk: convert(self._send_indices[rk]) for rk in self._send_indices } # Compute the intersections of the mesh on target with every mesh on # source ---> find which part of the local mesh must recv something # and from who, # which results in the self._recv_indices dict. # self._recv_indices[i] = [slice(...), slice(...), slice(...)] # means that the current process must recv from process i # the grid points defined by the slices above. current = indices_target[self._rank] for rk in indices_source: inter = Utils.intersect_slices(current, indices_source[rk]) if inter is not None: self._recv_indices[rk] = inter convert = self._target.mesh.global_to_local self._recv_indices = { rk: convert(self._recv_indices[rk]) for rk in self._recv_indices }
[docs] def has_local_inter(self): """True if local mesh points are also present on remote mesh""" return self._rank in self._send_indices
[docs] def local_source_ind(self): """indices of points (in the local mesh) that also belong to remote mesh """ if self._rank in self._send_indices: return self._send_indices[self._rank] else: return {}
[docs] def local_target_ind(self): """indices of points (in the remote mesh) that also belong to the local mesh """ if self._rank in self._recv_indices: return self._recv_indices[self._rank] else: return {}
[docs] def recv_types(self): """Returns the dictionnary of MPI derived types received on targeted topology. """ if self._recv_types is None: data_shape = self._target.mesh.local_resolution self._recv_types = TopoTools.create_subarray( self._recv_indices, data_shape, dtype=self._dtype, order=self._order ) return self._recv_types
[docs] def send_types(self): """Returns the dictionnary of MPI derived types sent by source topology.""" if self._send_types is None: data_shape = self._source.mesh.local_resolution self._send_types = TopoTools.create_subarray( self._send_indices, data_shape, dtype=self._dtype, order=self._order ) return self._send_types
[docs] class BridgeInter: """Intersection between two topologies defined on two different mpi communicators. """ def __init__(self, current, source_id, target_id, dtype, order): """Intersection between two topologies defined on different mpi communicators (i.e. implies mpi intercomm) See users' manual for details Parameters ---------- current : :class:`~hysop.topology.topology.CartesianTopology` source_id, target_id : int mpi task ids for the source/target. Required if source/target is None else infered from source/target. dtype: numpy dtype to be send and received order: memory order of arrays to be send/recieved """ check_instance(current, CartesianTopologyView) check_instance(source_id, (int, HYSOP_INTEGER)) check_instance(target_id, (int, HYSOP_INTEGER)) # The aim of a bridge if to compute the intersection of mesh grids # on source topology with those on target topology, to be able to tell # who must send/recv what to which process. # This is done in steps: # - the indices of grid points of each process are gathered # onto the root process, for both source and target --> global_indices. # We compute global indices (i.e. relative to the global grid) # - an intercommunicator is used to broadcast these indices # from source to the processes of target. # source task number self.source_id = source_id # target task number self.target_id = target_id assert isinstance(current, CartesianTopologyView) domain = current.domain parent = domain.parent_comm assert isinstance(parent, MPI.Intracomm) self._rank = parent.Get_rank() self._topology = current # current task id current_task = self._topology.domain.current_task() # True if current process is in the 'from' group' self._task_is_source = current_task == self.source_id # True if current process is in the 'to' group self._task_is_target = current_task == self.target_id # Ensure that current process belongs to one and only one task. assert self._task_is_source or self._task_is_target assert not (self._task_is_source and self._task_is_target) # Get the appropriate intercommunicator self.comm = domain.task_intercomm( target_id if self._task_is_source else source_id ) convert = self._topology.mesh.global_to_local current_indices, remote_indices = self._swap_indices() _transfer_indices = {} current = current_indices[domain.task_rank()] for rk in remote_indices: inter = Utils.intersect_slices(current, remote_indices[rk]) if inter is not None: _transfer_indices[rk] = inter # Back to local indices self._transfer_indices = { rk: convert(_transfer_indices[rk]) for rk in _transfer_indices } self._transfer_types = None self._dtype = dtype self._order = order def _swap_indices(self): """collect current/remote indices""" # First, we need to collect the global indices, as arrays # since we need to broadcast them later. current_indices = TopoTools.gather_global_indices(self._topology, toslice=False) # To allocate remote_indices array, we need the size of # the remote communicator. remote_size = self.comm.Get_remote_size() dimension = self._topology.domain.dim remote_indices = npw.dim_zeros((dimension * 2, remote_size)) # Then they are broadcasted to the remote communicator rank = self._topology.domain.task_rank() current_task = self._topology.domain.current_task() if self._task_is_source: self.comm.bcast( current_indices, root=MPI.ROOT if rank == 0 else MPI.PROC_NULL ) recv = self.comm.bcast(remote_indices, root=0) remote_indices[...] = recv if self._task_is_target: recv = self.comm.bcast(remote_indices, root=0) remote_indices[...] = recv self.comm.bcast( current_indices, root=MPI.ROOT if rank == 0 else MPI.PROC_NULL ) gh = self._topology.ghosts # Convert numpy arrays to dict of slices ... current_indices = Utils.array_to_dict(current_indices) remote_indices = Utils.array_to_dict(remote_indices) return current_indices, remote_indices
[docs] def transfer_types(self, task_id=None): """Return the dictionnary of MPI derived types used for send (if on source) or receive (if on target) """ if self._transfer_types is None: data_shape = self._topology.mesh.local_resolution self._transfer_types = TopoTools.create_subarray( self._transfer_indices, data_shape, dtype=self._dtype, order=self._order ) return self._transfer_types
[docs] def transfer_indices(self, task_id=None): """Return the dictionnary of transfer local indices used for send (if on source) or receive (if on target) """ return self._transfer_indices
[docs] class BridgeOverlap(Bridge): """ Bridge between two topologies that: - have a different number of mpi processes - have common mpi processes i.e. something in between a standard bridge with intra-comm and a bridge dealing with intercommunication. The main difference with a standard bridge is that this one may be call on processes where either source or target does not exist. """ def __init__(self, source_id, target_id, comm_ref=None, **kwds): """Bridge between two topologies that: * have a different number of mpi processes * have common mpi processes Parameters ---------- comm_ref : MPI.COMM mpi communicator used for all global communications. It must include all processes of source and target. If None, source.parent is used. Notes ----- this is something in between a standard bridge with intra-comm and a bridge dealing with intercommunication. This is probably a very pathologic case ... The main difference with a standard bridge is that this one may be call on processes where either source or target does not exist. """ check_instance(comm_ref, MPI.Intracomm, allow_none=True) self._comm_ref = comm_ref self.domain = None self._source_task_id, self._target_task_id = source_id, target_id super().__init__(**kwds) def _check_topologies(self): check_instance(self._source, CartesianTopologyView, allow_none=True) check_instance(self._target, CartesianTopologyView, allow_none=True) # First check if source and target are complient if self.comm is None: if self._comm_ref: self.comm = self._comm_ref elif self._source is not None: self.comm = self._source.domain.parent_comm else: self.comm = self._target.domain.parent_comm # To build a bridge, all process in source/target must be in self.comm # and there must be an overlap between source # and target processes group. If not, turn to intercommunicator. intersec_size = TopoTools.intersection_size( self.comm if self._source is None else self._source.comm, self.comm if self._target is None else self._target.comm, ) if self._source is not None and self._target is not None: msg = "BridgeOverlap error: mpi group from " msg += "source and topo must overlap. If not " msg += "BridgeInter will probably suits better." assert intersec_size > 0, msg elif self._source is not None: assert isinstance(self._source, CartesianTopologyView) s_size = self._source.comm.Get_size() assert intersec_size == s_size elif self._target is not None: assert isinstance(self._target, CartesianTopologyView) self._target_task_id = self._target.mpi_params.task_id t_size = self._target.comm.Get_size() assert intersec_size == t_size self.domain = first_not_None(self._source, self._target).domain self._rank = self.comm.Get_rank() def _build_send_recv_dict(self): # Compute local intersections : i.e. find which grid points # are on both source and target mesh. # Filter out the empty slices (due to none topologies) indices_source = { rk: sl for rk, sl in TopoTools.gather_global_indices_overlap( self._source, self.comm, self.domain ).items() if not all([_ == slice(0, 0) for _ in sl]) } indices_target = { rk: sl for rk, sl in TopoTools.gather_global_indices_overlap( self._target, self.comm, self.domain ).items() if not all([_ == slice(0, 0) for _ in sl]) } # From now on, we have indices_source[rk] = global indices (slice) # of grid points of the source on process number rk in parent. # And the same thing for indices_target. dimension = self.domain.dim # Compute the intersections of the mesh on source with every mesh on # target (i.e. for each mpi process). # 1. Source indices : if self._rank in indices_source: current = indices_source[self._rank] for rk in indices_target: inter = Utils.intersect_slices(current, indices_target[rk]) if inter is not None: self._send_indices[rk] = inter if self._source is not None: # Back to local indices convert = self._source.mesh.global_to_local self._send_indices = { rk: convert(self._send_indices[rk]) for rk in self._send_indices } data_shape = self._source.mesh.local_resolution self._send_types = TopoTools.create_subarray( self._send_indices, data_shape, dtype=self._dtype, order=self._order ) # 2. Target indices : if self._rank in indices_target: current = indices_target[self._rank] for rk in indices_source: inter = Utils.intersect_slices(current, indices_source[rk]) if inter is not None: self._recv_indices[rk] = inter if self._target is not None: convert = self._target.mesh.global_to_local self._recv_indices = { rk: convert(self._recv_indices[rk]) for rk in self._recv_indices } data_shape = self._target.mesh.local_resolution self._recv_types = TopoTools.create_subarray( self._recv_indices, data_shape, dtype=self._dtype, order=self._order )
[docs] def transfer_types(self, task_id=None): """Return the dictionnary of MPI derived types used for send (if task_id is source) or receive (if task_id is target) """ if task_id == self._source_task_id: return self._send_types if task_id == self._target_task_id: return self._recv_types
[docs] def transfer_indices(self, task_id=None): """Return the dictionnary of local indices used for send (if task_id is source) or receive (if task_id is target) """ if task_id == self._source_task_id: return self._send_indices if task_id == self._target_task_id: return self._recv_indices